package com.dahuan.sink;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.Properties;

public class Redis_Sink {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //创建kafka配置信息
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "localhost:9092");
        prop.setProperty("group.id", "consumer-group");
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset", "latest");

        //从kafka当中传输数据
        DataStreamSource<String> inputStream = env.addSource( new FlinkKafkaConsumer011<String>( "sensor", new SimpleStringSchema(), prop ) );

        //进行系列转换
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0],new Long( split[1] ),new Double( split[2] ));
        } );


        // 定义jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig
                .Builder()
                .setHost( "localhost" )
                .setPort( 6379 )
                .build();


        dataStream.addSink( new RedisSink<>(config,new MyRedis()));

        env.execute("RedisSink");
    }
}

// 自定义RedisMapper
class MyRedis implements RedisMapper<SensorReading>{


    // 定义保存数据到redis的命令，存成Hash表，hset sensor_temp id temperature
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription( RedisCommand.HSET,"sensor_temp" );
    }

    //返回键
    @Override
    public String getKeyFromData(SensorReading data) {
        return data.getId();
    }

    //返回值
    @Override
    public String getValueFromData(SensorReading data) {
        return data.getTemperature().toString();
    }
}